import io.openmessaging.KeyValue;
import io.openmessaging.Message;
import io.openmessaging.PullConsumer;
import io.openmessaging.demo.DefaultKeyValue;
import io.openmessaging.demo.DefaultPullConsumer;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Author :  Rocky
 * Date : 24/05/2017 15:00
 * Description :
 * Test :
 */
public class ConsumerTest {

    public static void main(String[] args) throws InterruptedException {
        String path = "";
        if (args == null || args.length <= 0 || args[0] == null) {
            path = "/Users/zuji/Documents/project/open-messaging-demo";
        } else {
            path = args[0];
        }
        KeyValue kvs = new DefaultKeyValue();
        kvs.put("STORE_PATH", path);

        CountDownLatch latch = new CountDownLatch(10);

        List<List<String>> topicList = new ArrayList<>(20);
        int k = 0;
        for (int i = 0; i < 10; i++) {
            List<String> topcs = new ArrayList<>(10);
            for (int j = 0; j < 10; j++) {
                k++;
                topcs.add("topic-" + k);
            }
            topicList.add(topcs);
        }

        AtomicInteger totalReadedMsgCount = new AtomicInteger(0);

        long s = System.currentTimeMillis();
        for (int i = 0; i < 10; i++) {
            int finalI = i;
            Thread t = new Thread(new Runnable() {
                @Override
                public void run() {
                    PullConsumer consumer = new DefaultPullConsumer(kvs);

                    List<String> topics = topicList.get(finalI);

                    consumer.attachQueue(UUID.randomUUID().toString(), topics);

                    Message message = null;
                    while ((message = consumer.poll()) != null) {
                        totalReadedMsgCount.incrementAndGet();
                    }

                    latch.countDown();
                }
            });
            t.start();
        }

        latch.await();
        long e = System.currentTimeMillis();
        System.out.println("应该拉取4kw消息，实际拉取 " + totalReadedMsgCount.get() + " 消息，耗时 " + (e - s) + " ms，总字节 " + DefaultPullConsumer.wroteSize + " byte，速度 " + (DefaultPullConsumer.wroteSize / 1024 / ((e - s) / 1000)) + " kb/s");
    }
}
